import pandas as pd
from scipy.signal import savgol_filter
import scipy.signal
import numpy as np
import holoviews as hv
import bokeh.plotting
import bokeh.io
import bokeh.driving
import bebi103
import iqplot
bebi103.hv.set_defaults()
bokeh.io.output_notebook()
hv.extension("bokeh")
%load_ext blackcellmagic
/Users/saehuihwang/opt/anaconda3/envs/ee189/lib/python3.8/site-packages/arviz/__init__.py:317: UserWarning: Trying to register the cmap 'cet_gray' which already exists.
register_cmap("cet_" + name, cmap=cmap)
/Users/saehuihwang/opt/anaconda3/envs/ee189/lib/python3.8/site-packages/arviz/__init__.py:317: UserWarning: Trying to register the cmap 'cet_gray_r' which already exists.
register_cmap("cet_" + name, cmap=cmap)
The pulse oximeter presented in this report is characterized by its extremely simple circuit design and intuitive digital signal processing. The product is made with an Arduino Uno, and is easy to operate. The user interface is illustrative and simple, displaying the pulse trace, heartrate, and the blood oxygen concentration (%).
The overall working principle is based on the fact that oxyhemoglobin (O₂Hb) and deoxygenated hemoglobin (Hb) have different optical absorption spectra. By shining LEDs with different wavelengths and measuring the absorption as the light passes through human tissue, we can obtain a metric for the blood oxygen concentration. The heartrate is simply measured by how the absorption changes over time.
This device has an extremely minimal BOM.
Below, the wiring is presented. The green wire represents the data line, red wire represents $V_{DD} = 5V$, black wire represents ground, purple and white wires are for the LEDs.

Below, the circuit schematic is presented. Similar coloring conventions have been used as the wiring diagram for readability. The resistor values were chosen according to the project specifications.

Since the all of signal processing happened digitally, let us go over each step of the code, using the below pseudocode. Detailed implementation can be found in pulseox_app.py and in commented_pulse_ox.ipynb.
The raw code is attached to the submission, and below is the pseudocode.
The 5ms delay before reading from the ADC is there to ensure that there is time for the LED current to reach its intended value and thus fully emit light.
This results in a total data acquisition time of 15 milliseconds. Considering a baud rate of 115200, We can running the .sizeof() on the output string on the arduino, which returned 6 bytes. Let us say each byte is 10 bits for flexibility. This gives us 1920 outputs per second. However, we do discard several reads to account for data corruption, so let us say we get 1500 outputs per second. This gives us 0.6ms interval between each output read. Therefore, we estimate that the arduino itself is able to handle 16ms intervals between each output.
Every 3 seconds
take_snapshot(V1, V2, n = 200 samples)scipy.stats.trim_mean(heartrates, 0.05))scipy.stats.trim_mean(oxygen, 0.05))take_snapshot(V1, V2, 100 samples)
len([V1_peak_ind])==len([V2_peak_ind]):The data acquisition delay in the arduino was 16ms. Giving this some headroom to be 30ms, we would obtain 100 samples in about 3 seconds. The take_snapshot() function is then called approximately every 3 seconds and extracts heartrate/oxygen for the past n=200 samples (6 seconds) of pulse data. The take_snapshot() function is utilized so that we would not be printing the information too fast in a way that would be hard to comprehend by the user, and to minimize error through averaging. Considering the resting human heartrate ranges from 60bpm to 100bpm, a window of 6 seconds would get at least 6 beats and at most 10 beats. A 20 year old's running heart rate goes up to 200bpm, in which the window would contain almost 20 heartbeats. By obtaining multiple heartrate/oxygen for a given window, we can extract outliers and take the average to prevent DSP errors from confusing the user. This design is forgiving on sacrificing some data for the ease of use and readability for the user.
Within the take_snapshot() function, the DC value is extracted using a moving average filter. The parameter was chosen such that the averaging window would be approximately the length of a pulse. This allows us to extract the DC value most accurately. Since we are getting 3 ~ 5 beats in 100 samples, the averaging window was chosen to be n=20. We want our window to cover a section of a pulse. A small caveat to this method is that the output of the rolling average is smaller in length than the input array. So we simply repeat the last DC value to extend the length of the DC array. Next, the peak indices are found using scipy.signal.find_peaks. The prominence parameter was found by trial and error of visual inspection. Then oxygen concentration was calculated using the calibration curve given by Tremper and Barker. One important line of code here is checking that the number of peaks in the IR signal is equal to the number of peaks in the Red signal. Otherwise, it wouldn't make sense to calculate the ratio between the heights of the two curves. The heartrate was calculated by using the inverse of the inter-peak times of V2, which is the IR voltage. This design choice was made because the IR signal was stronger in magnitude compared to the Red signal. By using only the IR signal, we reduce logic complexity and increase reliability. The output of the take_snapshot() function is an array of oxygen levels and heartrates, with n=number of peaks elements in the arrays.
Four ColumnDataSources are maintained and updated. data, which contains the raw voltage values and time in seconds, peak_time_source, which contains the time (in milliseconds) at which the peaks occur, heartrate_source, and oxygen_source for heartrate and oxygen data. Each of these information to be maintained in a separate dictionary since they may update at different rates. We store the peak_time_source to know when to take the next snapshot (i.e. when the current time is more than 3 seconds than the last peak that was extracted)
When displaying the oxygen/heartrates, we display by averaging the array of oxygen/heartrates outputted by take_snapshot(). We exclude outliers by using the scipy.stats.trim_mean(5%) function, which slices off the left most and rightmost 5% of the array.
The user interface looks as below

The heartrate and oxygen concentration level is printed in large bold letters for readability. The raw pulse traces are shown, and the plot has an automatically adjusting y range to accomodate for drift and differences in voltage values for different users. The acquire button is used to start and pause the instrument, the save button is used to save the raw traces, and the shut down button is used to shut down the app.

bokeh serve --show pulseox_app.py on the command line. save and the .csv file will be saved in the local directory. Shut Down button button, and the device will shut down. The attached Demonstration.mp4 file demonstrates that the device functions properly. The analysis section below verifies key segments of the DSP in a modular, visual basis.
Let us first take a look at some of the raw waveforms collected by the app. First, here is a generic pulse.
pulse = pd.read_csv("stream.csv")[-500:]
p = bokeh.plotting.figure(
frame_width=500,
frame_height=175,
x_axis_label="time (s)",
y_axis_label="voltage (V)",
title="Saehui's pulse",
toolbar_location="above",
)
p.line(
source=pulse,
x="time (ms)",
y="Voltage1 (V)",
visible=True,
line_color="#ef8a62",
legend_label="Red LED",
)
p.line(
source=pulse,
x="time (ms)",
y="Voltage2 (V)",
visible=True,
line_color="#67a9cf",
legend_label="IR LED",
)
p.legend.orientation = "horizontal"
p.add_layout(p.legend[0], "below")
bokeh.io.show(p)
Now we will demonstrate that the DC voltage extraction works as expected. Recall that we use a moving average to construct the DC voltage array, and that we must extend it in order to make the output the same length as the input.
v1_dc = pulse["Voltage1 (V)"].rolling(20).mean().dropna().tolist()
v2_dc = pulse["Voltage2 (V)"].rolling(20).mean().dropna().tolist()
def extend_moving_avg(l, length):
"""Since moving average returns an array that is smaller than the
input array, extend the last output of the moving averaged array to
make it the same length as its pre-filter array"""
l.extend([l[-1]] * (length - len(l)))
return l
v1_dc = extend_moving_avg(v1_dc, len(pulse))
v2_dc = extend_moving_avg(v2_dc, len(pulse))
p.line(
x=pulse["time (ms)"],
y=v1_dc,
line_color="#b2182b",
line_dash="dashed",
legend_label="Red DC",
)
p.line(
x=pulse["time (ms)"],
y=v2_dc,
line_color="#2166ac",
line_dash="dashed",
legend_label="IR DC",
)
p.yaxis.visible = False
p.legend.orientation = "horizontal"
p.add_layout(p.legend[0], "below")
bokeh.io.show(p)
Great! Now we will show the peakfinding ability.
v1_peak_inds = scipy.signal.find_peaks(pulse["Voltage1 (V)"], prominence=0.005)[0]
v2_peak_inds = scipy.signal.find_peaks(pulse["Voltage2 (V)"], prominence=0.005)[0]
v1_peak_times = pulse["time (ms)"][v1_peak_inds]
v2_peak_times = pulse["time (ms)"][v2_peak_inds]
p.circle(x=v1_peak_times, y=pulse["Voltage1 (V)"][v1_peak_inds], color='#053061')
p.circle(x=v2_peak_times, y=pulse["Voltage2 (V)"][v2_peak_inds], color='#67001f')
bokeh.io.show(p)
We can also check the oxygen level processing algorithm.
v1_peak_ac = np.array(pulse["Voltage1 (V)"])[v1_peak_inds]
v2_peak_ac = np.array(pulse["Voltage2 (V)"])[v2_peak_inds]
v1_peak_dc = np.array(v1_dc)[v1_peak_inds]
v2_peak_dc = np.array(v2_dc)[v2_peak_inds]
a=-3.3
b=-21.1
c=109.6
R = (v1_peak_ac/v1_peak_dc)/(v2_peak_ac/v2_peak_dc)
oxygen = a * R**2 + b * R + c
print(f"all oxygen levels: {oxygen}")
print(f"average in window: {np.average(oxygen)}")
all oxygen levels: [85.52175351 85.52271754 85.60462412 85.4615658 85.43651766 85.43209197 85.45516508] average in window: 85.49063366954331
These are very reasonable values!
Now let us verify our DSP algorithm for two scenarios: resting stance, and after 1 minute of jumping jacks & high knees. First look at the raw trace.
steady = pd.read_csv("steady.csv")[-1000:]
jumping = pd.read_csv("jumpingjacks.csv")[-1000:]
steady["time (ms)"] = jumping["time (ms)"].to_numpy()
steady = steady.reset_index()
jumping = jumping.reset_index()
df = pd.concat([steady, jumping], keys=["steady", "jumping"])
df.reset_index(level=0, inplace=True)
df = df.rename(columns={"level_0": "action"})
p_steady = bokeh.plotting.figure(
frame_width=500,
frame_height=175,
x_axis_label="time (s)",
y_axis_label="voltage (V)",
title="Steady Heartbeat",
toolbar_location="above",
)
p_steady.line(
source=df.loc[df['action']=='steady'],
x="time (ms)",
y="Voltage2 (V)",
visible=True,
line_color="#67a9cf",
legend_label="IR LED",
)
p_jumping = bokeh.plotting.figure(
frame_width=500,
frame_height=175,
x_axis_label="time (s)",
y_axis_label="voltage (V)",
title="Jumping Jack Heartbeat",
toolbar_location="above",
)
p_jumping.line(
source=df.loc[df['action']=='jumping'],
x="time (ms)",
y="Voltage2 (V)",
visible=True,
line_color="#ef8a62",
legend_label="Red LED",
)
p_steady.legend.orientation = "horizontal"
p_steady.add_layout(p_steady.legend[0], "below")
p_jumping.legend.visible = False
p_jumping.legend.orientation = "horizontal"
p_jumping.add_layout(p_jumping.legend[0], "below")
bokeh.io.show(bokeh.layouts.row(p_steady, p_jumping))
Even by eye, we can see that after the jumping jacks the heart beats a lot faster. Now we check the peak finding algorithm.
v2_peak_inds_steady = scipy.signal.find_peaks(steady["Voltage2 (V)"], prominence=0.005)[0]
v2_peak_inds_jumping = scipy.signal.find_peaks(jumping["Voltage2 (V)"], prominence=0.005)[0]
v2_peak_times_steady = steady["time (ms)"][v2_peak_inds_steady]
v2_peak_times_jumping = jumping["time (ms)"][v2_peak_inds_jumping]
p_steady.circle(x=v2_peak_times_steady, y=steady["Voltage2 (V)"][v2_peak_inds_steady], color='#67001f')
p_jumping.circle(x=v2_peak_times_jumping, y=jumping["Voltage2 (V)"][v2_peak_inds_jumping], color='#67001f')
bokeh.io.show(bokeh.layouts.row(p_steady, p_jumping))
We will visualize the distribution of the heartrates
heartrates_steady = np.array(1)/np.ediff1d(v2_peak_times_steady) * 1000 * 60
heartrates_jumping = np.array(1)/np.ediff1d(v2_peak_times_jumping) * 1000 * 60
ecdf_steady=iqplot.ecdf(heartrates_steady, legend_label="steady", title="Heart rate after activity", palette=['#ef8a62'])
ecdf_jumping = iqplot.ecdf(heartrates_jumping, p=ecdf_steady, legend_label = "jumping")
bokeh.io.show(ecdf_jumping)
We can clearly see that after 1 minute of jumping jacks, the heartrate increased. This demonstrates that our method of heartrate extraction works as expected.
Several design changes can be made.
1. Use a TIA intead of a resistor with the photodiode. This is a relatively simple change to make to the circuit. We would just replace the resistor with an op amp with feedback, as shown below. This configuration would allow for faster response time and operation of the photodiode in the linear region. Furthermore, the op amp circuit would load the photodiode less and present less output impedance. In the circuit below, $I_{photodiode} = \frac{V_{out}}{R_f}$.

2. Calibrate device The presented pulse oxymeter is a non invasive device that uses a calibration curve obtained by [Tremper and Barker].(https://pubs.asahq.org/anesthesiology/article/70/1/98/30812/Pulse-Oximetry). However, variability in the LEDs and the photodiodes make it likely that the calibration curve for our particular sensor s different than the one obtained by Trember and Barker. To obtain a calibration curve, one may consider using an invasive blood oxygen analyzer to fit a curve for $a * R^2 + b * R + c $, then use the obtained $a, b, c$ in the code.
3. UI Although the DSP algorithm is robust even with the evident drift in the signal, the user does not care about the drift of the signal. To improve user experience, the UI can only include the trace of one of the LEDs, and the DC value can be subtracted so that the drift is not shown on the display. Also, it would be nice for the user to be able to download the pulse information as well.